Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipereader example require internal "unsafe" class #169

Open
large opened this issue Jul 23, 2021 · 1 comment
Open

Pipereader example require internal "unsafe" class #169

large opened this issue Jul 23, 2021 · 1 comment

Comments

@large
Copy link

large commented Jul 23, 2021

Hi @cosullivan , I am currently upgrading to v9.0 and had to rebuild my ListenFactory with PIPE as design is now.

In your example:

Console.WriteLine(">>> {0}", StringUtil.Create(readResult.Buffer));

The StringUtil are "unsafe" because of the fixed buffers and pointers in the internal class. (I'm an old c++ programmer, so that is why I write "unsafe". You know what you are doing, so it is not unsafe... :))

By using your internal class StringUtil in an example would bind you to implement the class or SmtpProject to get it compiled using the Nuget. Wouldn't be enough to use Streamreader/Streamwriter to handle PIPEs in your SmtpServer?

@large
Copy link
Author

large commented Jul 23, 2021

I played with your example to get the full communication between server -> client.
Here is my suggestion to achieve that:

    public sealed class CustomSecurableDuplexPipe : ISecurableDuplexPipe
    {
        readonly ISecurableDuplexPipe _securableDuplexPipe;

        public CustomSecurableDuplexPipe(ISecurableDuplexPipe securableDuplexPipe)
        {
            _securableDuplexPipe = securableDuplexPipe;
        }

        public Task UpgradeAsync(X509Certificate certificate, SslProtocols protocols, CancellationToken cancellationToken = default)
        {
            return _securableDuplexPipe.UpgradeAsync(certificate, protocols, cancellationToken);
        }

        public void Dispose()
        {
            Console.WriteLine("Raw transcript of last connection:");
            foreach (var logLine in LogArray)
                Console.WriteLine(logLine);

            _securableDuplexPipe.Dispose();
        }

	//Actual variable that holds the log
        private List<String> LogArray = new List<String>();

        public PipeReader Input => new LoggingPipeReader(_securableDuplexPipe.Input, LogArray);

        public PipeWriter Output => new LoggingPipeWriter(_securableDuplexPipe.Output, LogArray);

        public bool IsSecure => _securableDuplexPipe.IsSecure;
    }

    //Quite simular to LoggingPipeReader and as simple as possible
    public sealed class LoggingPipeWriter : PipeWriter
    {
        readonly PipeWriter _delegate;
        readonly List<String> _logArray; //Store data until EOF

        public LoggingPipeWriter(PipeWriter @delegate, List<String> @logArray)
        {
            _delegate = @delegate;
            _logArray = @logArray;
        }

        public override void Advance(int bytes)
        {
            var spandata = GetSpan().Slice(0, bytes);
            String str = Encoding.UTF8.GetString(spandata);
            _logArray.Add(str);
            _delegate.Advance(bytes);
        }

        //Unused, but required overrides
        public override void CancelPendingFlush()
            => _delegate.CancelPendingFlush();

        public override void Complete(Exception exception = null)
            => _delegate.Complete(exception);

        public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
            => _delegate.FlushAsync(cancellationToken);

        public override Memory<byte> GetMemory(int sizeHint = 0)
            => _delegate.GetMemory(sizeHint);

        public override Span<byte> GetSpan(int sizeHint = 0)
            => _delegate.GetSpan(sizeHint);
    }


    //LoggingPipeReader with helper for getting data after each CRLF
    public sealed class LoggingPipeReader : PipeReader
    {
        readonly PipeReader _delegate;
        readonly List<String> _logArray;

        public LoggingPipeReader(PipeReader @delegate, List<String> @logArray)
        {
            _delegate = @delegate;
            _logArray = @logArray;
        }

        public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default(CancellationToken))
        {
            ReadResult readResult = await _delegate.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = readResult.Buffer;
            string tmp = ProcessLine(ref buffer); ;
            if (tmp != null)
                _logArray.Add(tmp);
            return readResult;
        }

        //Unused, but required overrides
        public override void AdvanceTo(SequencePosition consumed) => _delegate.AdvanceTo(consumed);
        public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
            => _delegate.AdvanceTo(consumed, examined);
        public override void CancelPendingRead() => _delegate.CancelPendingRead();
        public override void Complete(Exception exception = null) => _delegate.Complete(exception);
        public override bool TryRead(out ReadResult result) => _delegate.TryRead(out result);

        //Helper class, raw copy from: https://dev.to/joni2nja/evaluating-readline-using-system-io-pipelines-performance-in-c-part-2-pmf
        private static ReadOnlySpan<byte> NewLine => new[] { (byte)'\r', (byte)'\n' };
        private static string ProcessLine(ref ReadOnlySequence<byte> buffer)
        {
            string str = null;

            if (buffer.IsSingleSegment)
            {
                var span = buffer.FirstSpan;
                int consumed;
                while (span.Length > 0)
                {
                    var newLine = span.IndexOf(NewLine);

                    if (newLine == -1) break;

                    var line = span.Slice(0, newLine);
                    str = Encoding.UTF8.GetString(line);

                    // simulate string processing
                    //str = str.AsSpan().Slice(0, 5).ToString();

                    consumed = line.Length + NewLine.Length;
                    span = span.Slice(consumed);
                    buffer = buffer.Slice(consumed);
                }
            }
            else
            {
                var sequenceReader = new SequenceReader<byte>(buffer);

                while (!sequenceReader.End)
                {
                    while (sequenceReader.TryReadTo(out ReadOnlySequence<byte> line, NewLine))
                    {
                        str = Encoding.UTF8.GetString(line);

                        // simulate string processing
                        //str = str.AsSpan().Slice(0, 5).ToString();
                    }

                    buffer = buffer.Slice(sequenceReader.Position);
                    sequenceReader.Advance(buffer.Length);
                }
            }

            return str;
        }
    }

The helper for the reader here seems to be quite good. Since data is short it never triggers SequenceReader though, so that is probably useless.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant